import atexit
import base64
import datetime
import functools
import json
import logging
import os
import random
import socket
import tempfile
import time
import urllib3
import yaml

from collections import defaultdict
from copy import deepcopy
from http.client import HTTPException
from urllib3.exceptions import HTTPError
from threading import Condition, Lock, Thread
from typing import Any, Callable, Collection, Dict, List, Optional, Tuple, Type, Union, TYPE_CHECKING

from . import AbstractDCS, Cluster, ClusterConfig, Failover, Leader, Member, Status, SyncState, TimelineHistory
from ..collections import EMPTY_DICT
from ..exceptions import DCSError
from ..postgresql.mpp import AbstractMPP
from ..utils import deep_compare, iter_response_objects, keepalive_socket_options, \
    Retry, RetryFailedError, tzutc, uri, USER_AGENT
if TYPE_CHECKING:  # pragma: no cover
    from ..config import Config

logger = logging.getLogger(__name__)

KUBE_CONFIG_DEFAULT_LOCATION = os.environ.get('KUBECONFIG', '~/.kube/config')
SERVICE_HOST_ENV_NAME = 'KUBERNETES_SERVICE_HOST'
SERVICE_PORT_ENV_NAME = 'KUBERNETES_SERVICE_PORT'
SERVICE_TOKEN_FILENAME = '/var/run/secrets/kubernetes.io/serviceaccount/token'
SERVICE_CERT_FILENAME = '/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'
__temp_files: List[str] = []


class KubernetesError(DCSError):
    pass


def _cleanup_temp_files() -> None:
    global __temp_files
    for temp_file in __temp_files:
        try:
            os.remove(temp_file)
        except OSError:
            pass
    __temp_files = []


def _create_temp_file(content: bytes) -> str:
    if len(__temp_files) == 0:
        atexit.register(_cleanup_temp_files)

    fd, name = tempfile.mkstemp()
    os.write(fd, content)
    os.close(fd)
    __temp_files.append(name)
    return name


# this function does the same mapping of snake_case => camelCase for > 97% of cases as autogenerated swagger code
def to_camel_case(value: str) -> str:
    reserved = {'api', 'apiv3', 'cidr', 'cpu', 'csi', 'id', 'io', 'ip', 'ipc', 'pid', 'tls', 'uri', 'url', 'uuid'}
    words = value.split('_')
    return words[0] + ''.join(w.upper() if w in reserved else w.title() for w in words[1:])


class K8sConfig(object):

    class ConfigException(Exception):
        pass

    def __init__(self) -> None:
        self.pool_config: Dict[str, Any] = {'maxsize': 10, 'num_pools': 10}  # urllib3.PoolManager config
        self._token_expires_at = datetime.datetime.max
        self._headers: Dict[str, str] = {}
        self._make_headers()

    def _set_token(self, token: str) -> None:
        self._headers['authorization'] = 'Bearer ' + token

    def _make_headers(self, token: Optional[str] = None, **kwargs: Any) -> None:
        self._headers = urllib3.make_headers(user_agent=USER_AGENT, **kwargs)
        if token:
            self._set_token(token)

    def _read_token_file(self) -> str:
        if not os.path.isfile(SERVICE_TOKEN_FILENAME):
            raise self.ConfigException('Service token file does not exists.')
        with open(SERVICE_TOKEN_FILENAME) as f:
            token = f.read()
            if not token:
                raise self.ConfigException('Token file exists but empty.')
            self._token_expires_at = datetime.datetime.now() + self._token_refresh_interval
            return token

    def load_incluster_config(self, ca_certs: str = SERVICE_CERT_FILENAME,
                              token_refresh_interval: datetime.timedelta = datetime.timedelta(minutes=1)) -> None:
        if SERVICE_HOST_ENV_NAME not in os.environ or SERVICE_PORT_ENV_NAME not in os.environ:
            raise self.ConfigException('Service host/port is not set.')
        if not os.environ[SERVICE_HOST_ENV_NAME] or not os.environ[SERVICE_PORT_ENV_NAME]:
            raise self.ConfigException('Service host/port is set but empty.')

        if not os.path.isfile(ca_certs):
            raise self.ConfigException('Service certificate file does not exists.')
        with open(ca_certs) as f:
            if not f.read():
                raise self.ConfigException('Cert file exists but empty.')
        self.pool_config['ca_certs'] = ca_certs
        self._token_refresh_interval = token_refresh_interval
        token = self._read_token_file()
        self._make_headers(token=token)
        self._server = uri('https', (os.environ[SERVICE_HOST_ENV_NAME], os.environ[SERVICE_PORT_ENV_NAME]))

    @staticmethod
    def _get_by_name(config: Dict[str, List[Dict[str, Any]]], section: str, name: str) -> Optional[Dict[str, Any]]:
        for c in config[section + 's']:
            if c['name'] == name:
                return c[section]

    def _pool_config_from_file_or_data(self, config: Dict[str, str], file_key_name: str, pool_key_name: str) -> None:
        data_key_name = file_key_name + '-data'
        if data_key_name in config:
            self.pool_config[pool_key_name] = _create_temp_file(base64.b64decode(config[data_key_name]))
        elif file_key_name in config:
            self.pool_config[pool_key_name] = config[file_key_name]

    def load_kube_config(self, context: Optional[str] = None) -> None:
        with open(os.path.expanduser(KUBE_CONFIG_DEFAULT_LOCATION)) as f:
            config: Dict[str, Any] = yaml.safe_load(f)

        context = context or config['current-context']
        if TYPE_CHECKING:  # pragma: no cover
            assert isinstance(context, str)
        context_value = self._get_by_name(config, 'context', context)
        if TYPE_CHECKING:  # pragma: no cover
            assert isinstance(context_value, dict)
        cluster = self._get_by_name(config, 'cluster', context_value['cluster'])
        if TYPE_CHECKING:  # pragma: no cover
            assert isinstance(cluster, dict)
        user = self._get_by_name(config, 'user', context_value['user'])
        if TYPE_CHECKING:  # pragma: no cover
            assert isinstance(user, dict)

        self._server = cluster['server'].rstrip('/')
        if self._server.startswith('https'):
            self._pool_config_from_file_or_data(user, 'client-certificate', 'cert_file')
            self._pool_config_from_file_or_data(user, 'client-key', 'key_file')
            self._pool_config_from_file_or_data(cluster, 'certificate-authority', 'ca_certs')
            self.pool_config['cert_reqs'] = 'CERT_NONE' if cluster.get('insecure-skip-tls-verify') else 'CERT_REQUIRED'
        if user.get('token'):
            self._make_headers(token=user['token'])
        elif 'username' in user and 'password' in user:
            self._make_headers(basic_auth=':'.join((user['username'], user['password'])))

    @property
    def server(self) -> str:
        return self._server

    @property
    def headers(self) -> Dict[str, str]:
        if self._token_expires_at <= datetime.datetime.now():
            try:
                self._set_token(self._read_token_file())
            except Exception as e:
                logger.error('Failed to refresh service account token: %r', e)
        return self._headers.copy()


class K8sObject(object):

    def __init__(self, kwargs: Dict[str, Any]) -> None:
        self._dict = {k: self._wrap(k, v) for k, v in kwargs.items()}

    def get(self, name: str, default: Optional[Any] = None) -> Optional[Any]:
        return self._dict.get(name, default)

    def __getattr__(self, name: str) -> Any:
        return self.get(to_camel_case(name))

    @classmethod
    def _wrap(cls, parent: Optional[str], value: Any) -> Any:
        if isinstance(value, dict):
            data_dict: Dict[str, Any] = value
            # we know that `annotations` and `labels` are dicts and therefore don't want to convert them into K8sObject
            return data_dict if parent in {'annotations', 'labels'} and \
                all(isinstance(v, str) for v in data_dict.values()) else cls(data_dict)
        elif isinstance(value, list):
            data_list: List[Any] = value
            return [cls._wrap(None, v) for v in data_list]
        else:
            return value

    def to_dict(self) -> Dict[str, Any]:
        return self._dict

    def __repr__(self) -> str:
        return json.dumps(self, indent=4, default=lambda o: o.to_dict())


class K8sException(Exception):
    pass


class K8sConnectionFailed(K8sException):
    pass


class K8sClient(object):

    class rest(object):

        class ApiException(Exception):
            def __init__(self, status: Optional[int] = None, reason: Optional[str] = None,
                         http_resp: Optional[urllib3.HTTPResponse] = None) -> None:
                self.status = http_resp.status if http_resp else status
                self.reason = http_resp.reason if http_resp else reason
                self.body = http_resp.data if http_resp else None
                self.headers = http_resp.headers if http_resp else None

            def __str__(self) -> str:
                error_message = "({0})\nReason: {1}\n".format(self.status, self.reason)
                if self.headers:
                    error_message += "HTTP response headers: {0}\n".format(self.headers)
                if self.body:
                    error_message += "HTTP response body: {0}\n".format(self.body)
                return error_message

    class ApiClient(object):

        _API_URL_PREFIX = '/api/v1/namespaces/'

        def __init__(self, bypass_api_service: Optional[bool] = False) -> None:
            self._bypass_api_service = bypass_api_service
            self.pool_manager = urllib3.PoolManager(**k8s_config.pool_config)
            self._base_uri = k8s_config.server
            self._api_servers_cache = [k8s_config.server]
            self._api_servers_cache_updated = 0
            self.set_api_servers_cache_ttl(10)
            self.set_read_timeout(10)
            try:
                self._load_api_servers_cache()
            except K8sException:
                pass

        def set_read_timeout(self, timeout: Union[int, float]) -> None:
            self._read_timeout = timeout

        def set_api_servers_cache_ttl(self, ttl: int) -> None:
            self._api_servers_cache_ttl = ttl - 0.5

        def set_base_uri(self, value: str) -> None:
            logger.info('Selected new K8s API server endpoint %s', value)
            # We will connect by IP of the K8s master node which is not listed as alternative name
            self.pool_manager.connection_pool_kw['assert_hostname'] = False
            self._base_uri = value

        @staticmethod
        def _handle_server_response(response: urllib3.HTTPResponse,
                                    _preload_content: bool) -> Union[urllib3.HTTPResponse, K8sObject]:
            if response.status not in range(200, 206):
                raise k8s_client.rest.ApiException(http_resp=response)
            return K8sObject(json.loads(response.data.decode('utf-8'))) if _preload_content else response

        @staticmethod
        def _make_headers(headers: Optional[Dict[str, str]]) -> Dict[str, str]:
            ret = k8s_config.headers
            ret.update(headers or {})
            return ret

        @property
        def api_servers_cache(self) -> List[str]:
            base_uri, cache = self._base_uri, self._api_servers_cache
            return ([base_uri] if base_uri in cache else []) + [machine for machine in cache if machine != base_uri]

        def _get_api_servers(self, api_servers_cache: List[str]) -> List[str]:
            _, per_node_timeout, per_node_retries = self._calculate_timeouts(len(api_servers_cache))
            headers = self._make_headers({})
            kwargs = {'preload_content': True, 'retries': per_node_retries,
                      'timeout': urllib3.Timeout(connect=max(1.0, per_node_timeout / 2.0), total=per_node_timeout)}
            path = self._API_URL_PREFIX + 'default/endpoints/kubernetes'
            for base_uri in api_servers_cache:
                try:
                    response = self.pool_manager.request('GET', base_uri + path, headers=headers, **kwargs)
                    endpoint = self._handle_server_response(response, True)
                    if TYPE_CHECKING:  # pragma: no cover
                        assert isinstance(endpoint, K8sObject)
                    for subset in endpoint.subsets:
                        for port in subset.ports:
                            if port.name == 'https' and port.protocol == 'TCP':
                                addresses = [uri('https', (a.ip, port.port)) for a in subset.addresses]
                                if addresses:
                                    random.shuffle(addresses)
                                    return addresses
                except Exception as e:
                    if isinstance(e, k8s_client.rest.ApiException) and e.status == 403:
                        raise
                    self.pool_manager.clear()
                    logger.error('Failed to get "kubernetes" endpoint from %s: %r', base_uri, e)
            raise K8sConnectionFailed('No more K8s API server nodes in the cluster')

        def _refresh_api_servers_cache(self, updating_cache: Optional[bool] = False) -> None:
            if self._bypass_api_service:
                try:
                    api_servers_cache = [k8s_config.server] if updating_cache else self.api_servers_cache
                    self._api_servers_cache = self._get_api_servers(api_servers_cache)
                    if updating_cache:
                        self.pool_manager.clear()
                except k8s_client.rest.ApiException:  # 403 Permission denied
                    logger.warning("Kubernetes RBAC doesn't allow GET access to the 'kubernetes' "
                                   "endpoint in the 'default' namespace. Disabling 'bypass_api_service'.")
                    self._bypass_api_service = False
                    self._api_servers_cache = [k8s_config.server]
                    if not updating_cache:
                        self.pool_manager.clear()
                except K8sConnectionFailed:
                    if updating_cache:
                        raise K8sException("Could not get the list of K8s API server nodes")
                    return
            else:
                self._api_servers_cache = [k8s_config.server]

            if self._base_uri not in self._api_servers_cache:
                self.set_base_uri(self._api_servers_cache[0])
            self._api_servers_cache_updated = time.time()

        def refresh_api_servers_cache(self) -> None:
            if self._bypass_api_service and time.time() - self._api_servers_cache_updated > self._api_servers_cache_ttl:
                self._refresh_api_servers_cache()

        def _load_api_servers_cache(self) -> None:
            self._update_api_servers_cache = True
            self._refresh_api_servers_cache(True)
            self._update_api_servers_cache = False

        def _calculate_timeouts(self, api_servers: int, timeout: Optional[float] = None) -> Tuple[int, float, int]:
            """Calculate a request timeout and number of retries per single K8s API server node.
            In case if the timeout per node is too small (less than one second) we will reduce the number of nodes.
            For the cluster with only one API server node we will try to do 1 retry.
            No retries for clusters with 2 or more API server nodes. We better rely on switching to a different node."""

            per_node_timeout = timeout = float(timeout or self._read_timeout)

            max_retries = 3 - min(api_servers, 2)
            per_node_retries = 1
            min_timeout = 1.0

            while api_servers > 0:
                per_node_timeout = float(timeout) / api_servers
                if per_node_timeout >= min_timeout:
                    # for small clusters we will try to do more than one try on every node
                    while per_node_retries < max_retries and per_node_timeout / (per_node_retries + 1) >= min_timeout:
                        per_node_retries += 1
                    per_node_timeout /= per_node_retries
                    break
                # if the timeout per one node is to small try to reduce number of nodes
                api_servers -= 1
                max_retries = 1

            return api_servers, per_node_timeout, per_node_retries - 1

        def _do_http_request(self, retry: Optional[Retry], api_servers_cache: List[str],
                             method: str, path: str, **kwargs: Any) -> urllib3.HTTPResponse:
            some_request_failed = False
            for i, base_uri in enumerate(api_servers_cache):
                if i > 0:
                    logger.info('Retrying on %s', base_uri)
                try:
                    response = self.pool_manager.request(method, base_uri + path, **kwargs)
                    if some_request_failed:
                        self.set_base_uri(base_uri)
                        self._refresh_api_servers_cache()
                    return response
                except (HTTPError, HTTPException, socket.error, socket.timeout) as e:
                    self.pool_manager.clear()
                    if not retry:
                        # switch to the next node if request failed and retry is not allowed
                        if i + 1 < len(api_servers_cache):
                            self.set_base_uri(api_servers_cache[i + 1])
                        raise K8sException('{0} {1} request failed'.format(method, path))
                    logger.error('Request to server %s failed: %r', base_uri, e)
                    some_request_failed = True

            raise K8sConnectionFailed('No more API server nodes in the cluster')

        def request(
                self, retry: Optional[Retry], method: str, path: str,
                timeout: Union[int, float, Tuple[Union[int, float], Union[int, float]], urllib3.Timeout, None] = None,
                **kwargs: Any) -> urllib3.HTTPResponse:
            if self._update_api_servers_cache:
                self._load_api_servers_cache()

            api_servers_cache = self.api_servers_cache
            api_servers = len(api_servers_cache)

            if timeout:
                if isinstance(timeout, (int, float)):
                    timeout = urllib3.Timeout(total=timeout)
                elif isinstance(timeout, tuple) and len(timeout) == 2:
                    timeout = urllib3.Timeout(connect=timeout[0], read=timeout[1])
                retries = 0
            else:
                _, timeout, retries = self._calculate_timeouts(api_servers)
                timeout = urllib3.Timeout(connect=max(1.0, timeout / 2.0), total=timeout)
            kwargs.update(retries=retries, timeout=timeout)

            while True:
                try:
                    return self._do_http_request(retry, api_servers_cache, method, path, **kwargs)
                except K8sConnectionFailed as ex:
                    try:
                        self._load_api_servers_cache()
                        api_servers_cache = self.api_servers_cache
                        api_servers = len(api_servers_cache)
                    except Exception as e:
                        logger.debug('Failed to update list of K8s master nodes: %r', e)

                    if TYPE_CHECKING:  # pragma: no cover
                        assert isinstance(retry, Retry)  # K8sConnectionFailed is raised only if retry is not None!
                    sleeptime = retry.sleeptime
                    remaining_time = (retry.stoptime or time.time()) - sleeptime - time.time()
                    nodes, timeout, retries = self._calculate_timeouts(api_servers, remaining_time)
                    if nodes == 0:
                        self._update_api_servers_cache = True
                        raise ex
                    retry.sleep_func(sleeptime)
                    retry.update_delay()
                    # We still have some time left. Partially reduce `api_servers_cache` and retry request
                    kwargs.update(timeout=urllib3.Timeout(connect=max(1.0, timeout / 2.0), total=timeout),
                                  retries=retries)
                    api_servers_cache = api_servers_cache[:nodes]

        def call_api(self, method: str, path: str, headers: Optional[Dict[str, str]] = None,
                     body: Optional[Any] = None, _retry: Optional[Retry] = None, _preload_content: bool = True,
                     _request_timeout: Optional[float] = None, **kwargs: Any) -> Union[urllib3.HTTPResponse, K8sObject]:
            headers = self._make_headers(headers)
            fields = {to_camel_case(k): v for k, v in kwargs.items()}  # resource_version => resourceVersion
            body = json.dumps(body, default=lambda o: o.to_dict()) if body is not None else None

            response = self.request(_retry, method, self._API_URL_PREFIX + path, headers=headers, fields=fields,
                                    body=body, preload_content=_preload_content, timeout=_request_timeout)

            return self._handle_server_response(response, _preload_content)

    class CoreV1Api(object):

        def __init__(self, api_client: Optional['K8sClient.ApiClient'] = None) -> None:
            self._api_client = api_client or k8s_client.ApiClient()

        def __getattr__(self, func: str) -> Callable[..., Any]:
            # `func` name pattern: (action)_namespaced_(kind)
            action, kind = func.split('_namespaced_')  # (read|list|create|patch|replace|delete|delete_collection)
            kind = kind.replace('_', '') + ('s' * int(kind[-1] != 's'))  # plural, single word

            def wrapper(*args: Any, **kwargs: Any) -> Union[urllib3.HTTPResponse, K8sObject]:
                method = {'read': 'GET', 'list': 'GET', 'create': 'POST',
                          'replace': 'PUT'}.get(action, action.split('_')[0]).upper()

                if action == 'create' or len(args) == 1:  # namespace is a first argument and name in not in arguments
                    path = '/'.join([args[0], kind])
                else:  # name, namespace followed by optional body
                    path = '/'.join([args[1], kind, args[0]])

                headers = {'Content-Type': 'application/strategic-merge-patch+json'} if action == 'patch' else {}

                if len(args) == 3:  # name, namespace, body
                    body = args[2]
                elif action == 'create':  # namespace, body
                    body = args[1]  # pyright: ignore [reportGeneralTypeIssues]
                elif action == 'delete':  # name, namespace
                    body = kwargs.pop('body', None)
                else:
                    body = None

                return self._api_client.call_api(method, path, headers, body, **kwargs)
            return wrapper

    class _K8sObjectTemplate(K8sObject):
        """The template for objects which we create locally, e.g. k8s_client.V1ObjectMeta & co"""
        def __init__(self, **kwargs: Any) -> None:
            self._dict = {to_camel_case(k): v for k, v in kwargs.items()}

    def __init__(self) -> None:
        self.__cls_cache: Dict[str, Type['K8sClient._K8sObjectTemplate']] = {}
        self.__cls_lock = Lock()

    def __getattr__(self, name: str) -> Type['K8sClient._K8sObjectTemplate']:
        with self.__cls_lock:
            if name not in self.__cls_cache:
                self.__cls_cache[name] = type(name, (self._K8sObjectTemplate,), {})
        return self.__cls_cache[name]


k8s_client = K8sClient()
k8s_config = K8sConfig()


class KubernetesRetriableException(k8s_client.rest.ApiException):

    def __init__(self, orig: K8sClient.rest.ApiException) -> None:
        super(KubernetesRetriableException, self).__init__(orig.status, orig.reason)
        self.body = orig.body
        self.headers = orig.headers

    @property
    def sleeptime(self) -> Optional[int]:
        try:
            return int((self.headers or EMPTY_DICT).get('retry-after', ''))
        except Exception:
            return None


class CoreV1ApiProxy(object):
    """Proxy class to work with k8s_client.CoreV1Api() object"""

    _DEFAULT_RETRIABLE_HTTP_CODES = frozenset([500, 503, 504])

    def __init__(self, use_endpoints: Optional[bool] = False, bypass_api_service: Optional[bool] = False) -> None:
        self._api_client = k8s_client.ApiClient(bypass_api_service)
        self._core_v1_api = k8s_client.CoreV1Api(self._api_client)
        self._use_endpoints = bool(use_endpoints)
        self._retriable_http_codes = set(self._DEFAULT_RETRIABLE_HTTP_CODES)

    def configure_timeouts(self, loop_wait: int, retry_timeout: Union[int, float], ttl: int) -> None:
        # Normally every loop_wait seconds we should have receive something from the socket.
        # If we didn't received anything after the loop_wait + retry_timeout it is a time
        # to start worrying (send keepalive messages). Finally, the connection should be
        # considered as dead if we received nothing from the socket after the ttl seconds.
        self._api_client.pool_manager.connection_pool_kw['socket_options'] = \
            list(keepalive_socket_options(ttl, int(loop_wait + retry_timeout)))
        self._api_client.set_read_timeout(retry_timeout)
        self._api_client.set_api_servers_cache_ttl(loop_wait)

    def configure_retriable_http_codes(self, retriable_http_codes: List[int]) -> None:
        self._retriable_http_codes = self._DEFAULT_RETRIABLE_HTTP_CODES | set(retriable_http_codes)

    def refresh_api_servers_cache(self) -> None:
        self._api_client.refresh_api_servers_cache()

    def __getattr__(self, func: str) -> Callable[..., Any]:
        """Intercepts calls to `CoreV1Api` methods.

        Handles two important cases:
        1. Depending on whether Patroni is configured to work with `ConfigMaps` or `Endpoints`
           it remaps "virtual" method names from `*_kind` to `*_endpoints` or `*_config_map`.
        2. It handles HTTP error codes and raises `KubernetesRetriableException`
           if the given error is supposed to be handled with retry."""

        if func.endswith('_kind'):
            func = func[:-4] + ('endpoints' if self._use_endpoints else 'config_map')

        def wrapper(*args: Any, **kwargs: Any) -> Any:
            try:
                return getattr(self._core_v1_api, func)(*args, **kwargs)
            except k8s_client.rest.ApiException as e:
                if e.status in self._retriable_http_codes or e.headers and 'retry-after' in e.headers:
                    raise KubernetesRetriableException(e)
                raise
        return wrapper

    @property
    def use_endpoints(self) -> bool:
        return self._use_endpoints


def _run_and_handle_exceptions(method: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
    try:
        return method(*args, **kwargs)
    except k8s_client.rest.ApiException as e:
        if e.status == 403:
            logger.exception('Permission denied')
        elif e.status != 409:  # Object exists or conflict in resource_version
            logger.exception('Unexpected error from Kubernetes API')
        return False
    except (RetryFailedError, K8sException) as e:
        raise KubernetesError(e)


def catch_kubernetes_errors(func: Callable[..., Any]) -> Callable[..., Any]:
    def wrapper(self: 'Kubernetes', *args: Any, **kwargs: Any) -> Any:
        try:
            return _run_and_handle_exceptions(func, self, *args, **kwargs)
        except KubernetesError:
            return False
    return wrapper


class ObjectCache(Thread):

    def __init__(self, dcs: 'Kubernetes', func: Callable[..., Any], retry: Retry,
                 condition: Condition, name: Optional[str] = None) -> None:
        super(ObjectCache, self).__init__()
        self.daemon = True
        self._dcs = dcs
        self._func = func
        self._retry = retry
        self._condition = condition
        self._name = name  # name of this pod
        self._is_ready = False
        self._response: Union[urllib3.HTTPResponse, bool, None] = None  # needs to be accessible from the `kill_stream`
        self._response_lock = Lock()  # protect the `self._response` from concurrent access
        self._object_cache: Dict[str, K8sObject] = {}
        self._object_cache_lock = Lock()
        self._annotations_map = {self._dcs.leader_path: getattr(self._dcs, '_LEADER'),
                                 self._dcs.config_path: getattr(self._dcs, '_CONFIG')}  # pyright
        self.start()

    def _list(self) -> K8sObject:
        try:
            return self._func(_retry=self._retry.copy())
        except Exception:
            time.sleep(1)
            raise

    def _watch(self, resource_version: str) -> urllib3.HTTPResponse:
        return self._func(_request_timeout=(self._retry.deadline, urllib3.Timeout.DEFAULT_TIMEOUT),
                          _preload_content=False, watch=True, resource_version=resource_version)

    def set(self, name: str, value: K8sObject) -> Tuple[bool, Optional[K8sObject]]:
        with self._object_cache_lock:
            old_value = self._object_cache.get(name)
            ret = not old_value or int(old_value.metadata.resource_version) < int(value.metadata.resource_version)
            if ret:
                self._object_cache[name] = value
        return ret, old_value

    def delete(self, name: str, resource_version: str) -> Tuple[bool, Optional[K8sObject]]:
        with self._object_cache_lock:
            old_value = self._object_cache.get(name)
            ret = old_value and int(old_value.metadata.resource_version) < int(resource_version)
            if ret:
                del self._object_cache[name]
        return bool(not old_value or ret), old_value

    def copy(self) -> Dict[str, K8sObject]:
        with self._object_cache_lock:
            return self._object_cache.copy()

    def get(self, name: str) -> Optional[K8sObject]:
        with self._object_cache_lock:
            return self._object_cache.get(name)

    def _process_event(self, event: Dict[str, Union[Any, Dict[str, Union[Any, Dict[str, Any]]]]]) -> None:
        ev_type = event['type']
        obj = event['object']
        name = obj['metadata']['name']

        new_value = None
        if ev_type in ('ADDED', 'MODIFIED'):
            obj = K8sObject(obj)
            success, old_value = self.set(name, obj)
            if success:
                new_value = (obj.metadata.annotations or EMPTY_DICT).get(self._annotations_map.get(name, ''))
        elif ev_type == 'DELETED':
            success, old_value = self.delete(name, obj['metadata']['resourceVersion'])
        else:
            return logger.warning('Unexpected event type: %s', ev_type)

        if success and obj.get('kind') != 'Pod':
            if old_value:
                old_value = (old_value.metadata.annotations or EMPTY_DICT).get(self._annotations_map.get(name, ''))

            value_changed = old_value != new_value and \
                (name != self._dcs.config_path or old_value is not None and new_value is not None)

            if value_changed:
                logger.debug('%s changed from %s to %s', name, old_value, new_value)

            # Do not wake up HA loop if we run as leader and received leader object update event
            if value_changed or name == self._dcs.leader_path and self._name != new_value:
                self._dcs.event.set()

    @staticmethod
    def _finish_response(response: urllib3.HTTPResponse) -> None:
        try:
            response.close()
        finally:
            response.release_conn()

    def _do_watch(self, resource_version: str) -> None:
        with self._response_lock:
            self._response = None
        response = self._watch(resource_version)
        with self._response_lock:
            if self._response is None:
                self._response = response

        if not self._response:
            return self._finish_response(response)

        for event in iter_response_objects(response):
            if event['object'].get('code') == 410:
                break
            self._process_event(event)

    def _build_cache(self) -> None:
        objects = self._list()
        with self._object_cache_lock:
            self._object_cache = {item.metadata.name: item for item in objects.items}
        with self._condition:
            self._is_ready = True
            self._condition.notify()

        try:
            self._do_watch(objects.metadata.resource_version)
        finally:
            with self._condition:
                self._is_ready = False
            with self._response_lock:
                response, self._response = self._response, None
            if isinstance(response, urllib3.HTTPResponse):
                self._finish_response(response)

    def kill_stream(self) -> None:
        sock = None
        with self._response_lock:
            if isinstance(self._response, urllib3.HTTPResponse):
                try:
                    sock = self._response.connection.sock if self._response.connection else None
                except Exception:
                    sock = None
            else:
                self._response = False
        if sock:
            try:
                sock.shutdown(socket.SHUT_RDWR)
                sock.close()
            except Exception as e:
                logger.debug('Error on socket.shutdown: %r', e)

    def run(self) -> None:
        while True:
            try:
                self._build_cache()
            except Exception as e:
                logger.error('ObjectCache.run %r', e)

    def is_ready(self) -> bool:
        """Must be called only when holding the lock on `_condition`"""
        return self._is_ready


class Kubernetes(AbstractDCS):

    def __init__(self, config: Dict[str, Any], mpp: AbstractMPP) -> None:
        self._labels = deepcopy(config['labels'])
        self._labels[config.get('scope_label', 'cluster-name')] = config['scope']
        self._label_selector = ','.join('{0}={1}'.format(k, v) for k, v in self._labels.items())
        self._namespace = config.get('namespace') or 'default'
        self._role_label = config.get('role_label', 'role')
        self._leader_label_value = config.get('leader_label_value', 'master')
        self._follower_label_value = config.get('follower_label_value', 'replica')
        self._standby_leader_label_value = config.get('standby_leader_label_value', 'master')
        self._tmp_role_label = config.get('tmp_role_label')
        self._ca_certs = os.environ.get('PATRONI_KUBERNETES_CACERT', config.get('cacert')) or SERVICE_CERT_FILENAME
        super(Kubernetes, self).__init__({**config, 'namespace': ''}, mpp)
        if self._mpp.is_enabled():
            self._labels[self._mpp.k8s_group_label] = str(self._mpp.group)

        self._retry = Retry(deadline=config['retry_timeout'], max_delay=1, max_tries=-1,
                            retry_exceptions=KubernetesRetriableException)
        self._ttl = int(config.get('ttl') or 30)
        try:
            k8s_config.load_incluster_config(ca_certs=self._ca_certs)
        except k8s_config.ConfigException:
            k8s_config.load_kube_config(context=config.get('context', 'kind-kind'))

        self.__ips: List[str] = [] if self._ctl else [config.get('pod_ip', '')]
        self.__ports: List[K8sObject] = []
        ports: List[Dict[str, Any]] = config.get('ports', [{}])
        for p in ports:
            port: Dict[str, Any] = {'port': int(p.get('port', '5432'))}
            port.update({n: p[n] for n in ('name', 'protocol') if p.get(n)})
            self.__ports.append(k8s_client.V1EndpointPort(**port))

        bypass_api_service = not self._ctl and config.get('bypass_api_service')
        self._api = CoreV1ApiProxy(config.get('use_endpoints'), bypass_api_service)
        self._should_create_config_service = self._api.use_endpoints
        self.reload_config(config)
        # leader_observed_record, leader_resource_version, and leader_observed_time are used only for leader race!
        self._leader_observed_record: Dict[str, str] = {}
        self._leader_observed_time = None
        self._leader_resource_version = None
        self.__do_not_watch = False

        self._condition = Condition()

        pods_func = functools.partial(self._api.list_namespaced_pod, self._namespace,
                                      label_selector=self._label_selector)
        self._pods = ObjectCache(self, pods_func, self._retry, self._condition)

        kinds_func = functools.partial(self._api.list_namespaced_kind, self._namespace,
                                       label_selector=self._label_selector)
        self._kinds = ObjectCache(self, kinds_func, self._retry, self._condition, self._name)

    def retry(self, method: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
        retry = self._retry.copy()
        kwargs['_retry'] = retry
        return retry(method, *args, **kwargs)

    def client_path(self, path: str) -> str:
        return super(Kubernetes, self).client_path(path)[1:].replace('/', '-')

    @property
    def leader_path(self) -> str:
        return super(Kubernetes, self).leader_path[:-7 if self._api.use_endpoints else None]

    def set_ttl(self, ttl: int) -> Optional[bool]:
        ttl = int(ttl)
        self.__do_not_watch = self._ttl != ttl
        self._ttl = ttl
        return None

    @property
    def ttl(self) -> int:
        return self._ttl

    def set_retry_timeout(self, retry_timeout: int) -> None:
        self._retry.deadline = retry_timeout

    def reload_config(self, config: Union['Config', Dict[str, Any]]) -> None:
        """Handles dynamic config changes.

        Either cause by changes in the local configuration file + SIGHUP or by changes of dynamic configuration"""

        super(Kubernetes, self).reload_config(config)
        if TYPE_CHECKING:  # pragma: no cover
            assert self._retry.deadline is not None
        self._api.configure_timeouts(self.loop_wait, self._retry.deadline, self.ttl)

        # retriable_http_codes supposed to be either int, list of integers or comma-separated string with integers.
        retriable_http_codes: Union[str, List[Union[str, int]]] = config.get('retriable_http_codes', [])
        if not isinstance(retriable_http_codes, list):
            retriable_http_codes = [c.strip() for c in str(retriable_http_codes).split(',')]

        try:
            self._api.configure_retriable_http_codes([int(c) for c in retriable_http_codes])
        except Exception as e:
            logger.warning('Invalid value of retriable_http_codes = %s: %r', config['retriable_http_codes'], e)

    @staticmethod
    def member(pod: K8sObject) -> Member:
        annotations = pod.metadata.annotations or EMPTY_DICT
        member = Member.from_node(pod.metadata.resource_version, pod.metadata.name, None, annotations.get('status', ''))
        member.data['pod_labels'] = pod.metadata.labels
        return member

    def _wait_caches(self, stop_time: float) -> None:
        while not (self._pods.is_ready() and self._kinds.is_ready()):
            timeout = stop_time - time.time()
            if timeout <= 0:
                raise RetryFailedError('Exceeded retry deadline')
            self._condition.wait(timeout)

    def _cluster_from_nodes(self, group: str, nodes: Dict[str, K8sObject], pods: Collection[K8sObject]) -> Cluster:
        members = [self.member(pod) for pod in pods]
        path = self._base_path[1:] + '-'
        if group:
            path += group + '-'

        config = nodes.get(path + self._CONFIG)
        metadata = config and config.metadata
        annotations = metadata and metadata.annotations or {}

        # get initialize flag
        initialize = annotations.get(self._INITIALIZE)

        # get global dynamic configuration
        config = metadata and ClusterConfig.from_node(metadata.resource_version,
                                                      annotations.get(self._CONFIG) or '{}',
                                                      metadata.resource_version if self._CONFIG in annotations else 0)

        # get timeline history
        history = metadata and TimelineHistory.from_node(metadata.resource_version,
                                                         annotations.get(self._HISTORY) or '[]')

        leader_path = path[:-1] if self._api.use_endpoints else path + self._LEADER
        leader = nodes.get(leader_path)
        metadata = leader and leader.metadata
        if leader_path == self.leader_path:  # We want to memorize leader_resource_version only for our cluster
            self._leader_resource_version = metadata.resource_version if metadata else None
        annotations: Dict[str, str] = metadata and metadata.annotations or {}

        # get last known leader lsn and slots
        status = Status.from_node(annotations)

        # get failsafe topology
        try:
            failsafe = json.loads(annotations.get(self._FAILSAFE, ''))
        except Exception:
            failsafe = None

        # get leader
        leader_record: Dict[str, str] = {n: annotations[n] for n in (self._LEADER, 'acquireTime',
                                         'ttl', 'renewTime', 'transitions') if n in annotations}
        # We want to memorize leader_observed_record and update leader_observed_time only for our cluster
        if leader_path == self.leader_path and (leader_record or self._leader_observed_record)\
                and leader_record != self._leader_observed_record:
            self._leader_observed_record = leader_record
            self._leader_observed_time = time.time()

        leader = leader_record.get(self._LEADER)
        try:
            ttl = int(leader_record.get('ttl', self._ttl)) or self._ttl
        except (TypeError, ValueError):
            ttl = self._ttl

        # We want to check validity of the leader record only for our own cluster
        if leader_path == self.leader_path and\
                not (metadata and self._leader_observed_time and self._leader_observed_time + ttl >= time.time()):
            leader = None

        if metadata:
            member = Member(-1, leader or '', None, {})
            member = ([m for m in members if m.name == leader] or [member])[0]
            leader = Leader(metadata.resource_version, None, member)
        else:
            leader = None

        # failover key
        failover = nodes.get(path + self._FAILOVER)
        metadata = failover and failover.metadata
        failover = metadata and Failover.from_node(metadata.resource_version,
                                                   (metadata.annotations or EMPTY_DICT).copy())

        # get synchronization state
        sync = nodes.get(path + self._SYNC)
        metadata = sync and sync.metadata
        sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations)

        return Cluster(initialize, config, leader, status, members, failover, sync, history, failsafe)

    def _postgresql_cluster_loader(self, path: Dict[str, Any]) -> Cluster:
        """Load and build the :class:`Cluster` object from DCS, which represents a single PostgreSQL cluster.

        :param path: the path in DCS where to load :class:`Cluster` from.

        :returns: :class:`Cluster` instance.
        """
        return self._cluster_from_nodes(path['group'], path['nodes'], path['pods'].values())

    def _mpp_cluster_loader(self, path: Dict[str, Any]) -> Dict[int, Cluster]:
        """Load and build all PostgreSQL clusters from a single MPP cluster.

        :param path: the path in DCS where to load Cluster(s) from.

        :returns: all MPP groups as :class:`dict`, with group IDs as keys and :class:`Cluster` objects as values.
        """
        clusters: Dict[str, Dict[str, Dict[str, K8sObject]]] = defaultdict(lambda: defaultdict(dict))

        for name, pod in path['pods'].items():
            group = pod.metadata.labels.get(self._mpp.k8s_group_label)
            if group and self._mpp.group_re.match(group):
                clusters[group]['pods'][name] = pod

        for name, kind in path['nodes'].items():
            group = kind.metadata.labels.get(self._mpp.k8s_group_label)
            if group and self._mpp.group_re.match(group):
                clusters[group]['nodes'][name] = kind
        return {int(group): self._cluster_from_nodes(group, value['nodes'], value['pods'].values())
                for group, value in clusters.items()}

    def __load_cluster(
            self, group: Optional[str], loader: Callable[[Dict[str, Any]], Union[Cluster, Dict[int, Cluster]]]
    ) -> Union[Cluster, Dict[int, Cluster]]:
        if TYPE_CHECKING:  # pragma: no cover
            assert self._retry.deadline is not None
        stop_time = time.time() + self._retry.deadline
        self._api.refresh_api_servers_cache()
        try:
            with self._condition:
                self._wait_caches(stop_time)
                pods = {name: pod for name, pod in self._pods.copy().items()
                        if not group or pod.metadata.labels.get(self._mpp.k8s_group_label) == group}
                nodes = {name: kind for name, kind in self._kinds.copy().items()
                         if not group or kind.metadata.labels.get(self._mpp.k8s_group_label) == group}
            return loader({'group': group, 'pods': pods, 'nodes': nodes})
        except Exception:
            logger.exception('get_cluster')
            raise KubernetesError('Kubernetes API is not responding properly')

    def _load_cluster(
            self, path: str, loader: Callable[[Any], Union[Cluster, Dict[int, Cluster]]]
    ) -> Union[Cluster, Dict[int, Cluster]]:
        group = str(self._mpp.group) if self._mpp.is_enabled() and path == self.client_path('') else None
        return self.__load_cluster(group, loader)

    def get_mpp_coordinator(self) -> Optional[Cluster]:
        """Load the PostgreSQL cluster for the MPP Coordinator.

        .. note::
            This method is only executed on the worker nodes to find the coordinator.

        :returns: Select :class:`Cluster` instance associated with the MPP Coordinator group ID.
        """
        try:
            ret = self.__load_cluster(str(self._mpp.coordinator_group_id), self._postgresql_cluster_loader)
            if TYPE_CHECKING:  # pragma: no cover
                assert isinstance(ret, Cluster)
            return ret
        except Exception as e:
            logger.error('Failed to load %s coordinator cluster from Kubernetes: %r', self._mpp.type, e)

    @staticmethod
    def compare_ports(p1: K8sObject, p2: K8sObject) -> bool:
        return p1.name == p2.name and p1.port == p2.port and (p1.protocol or 'TCP') == (p2.protocol or 'TCP')

    @staticmethod
    def subsets_changed(last_observed_subsets: List[K8sObject], ip: str, ports: List[K8sObject]) -> bool:
        """
        >>> ip = '1.2.3.4'
        >>> a = [k8s_client.V1EndpointAddress(ip=ip)]
        >>> s = [k8s_client.V1EndpointSubset(addresses=a)]
        >>> Kubernetes.subsets_changed(s, '1.2.3.5', [])
        True
        >>> s = [k8s_client.V1EndpointSubset(addresses=a, ports=[k8s_client.V1EndpointPort(protocol='TCP', port=1)])]
        >>> Kubernetes.subsets_changed(s, '1.2.3.4', [k8s_client.V1EndpointPort(port=5432)])
        True
        >>> p1 = k8s_client.V1EndpointPort(name='port1', port=1)
        >>> p2 = k8s_client.V1EndpointPort(name='port2', port=2)
        >>> p3 = k8s_client.V1EndpointPort(name='port3', port=3)
        >>> s = [k8s_client.V1EndpointSubset(addresses=a, ports=[p1, p2])]
        >>> Kubernetes.subsets_changed(s, ip, [p2, p3])
        True
        >>> s2 = [k8s_client.V1EndpointSubset(addresses=a, ports=[p2, p1])]
        >>> Kubernetes.subsets_changed(s, ip, [p2, p1])
        False
        """

        if len(last_observed_subsets) != 1:
            return True
        if len(last_observed_subsets[0].addresses or []) != 1 or \
                last_observed_subsets[0].addresses[0].ip != ip or \
                len(last_observed_subsets[0].ports) != len(ports):
            return True
        if len(ports) == 1:
            return not Kubernetes.compare_ports(last_observed_subsets[0].ports[0], ports[0])
        observed_ports = {p.name: p for p in last_observed_subsets[0].ports}
        for p in ports:
            if p.name not in observed_ports or not Kubernetes.compare_ports(p, observed_ports.pop(p.name)):
                return True
        return False

    def __target_ref(self, leader_ip: str, latest_subsets: List[K8sObject], pod: K8sObject) -> K8sObject:
        # we want to re-use existing target_ref if possible
        empty_addresses: List[K8sObject] = []
        for subset in latest_subsets:
            for address in subset.addresses or empty_addresses:
                if address.ip == leader_ip and address.target_ref and address.target_ref.name == self._name:
                    return address.target_ref
        return k8s_client.V1ObjectReference(kind='Pod', uid=pod.metadata.uid, namespace=self._namespace,
                                            name=self._name, resource_version=pod.metadata.resource_version)

    def _map_subsets(self, endpoints: Dict[str, Any], ips: List[str]) -> None:
        leader = self._kinds.get(self.leader_path)
        empty_addresses: List[K8sObject] = []
        latest_subsets = leader and leader.subsets or empty_addresses
        if not ips:
            # We want to have subsets empty
            if latest_subsets:
                endpoints['subsets'] = []
            return

        pod = self._pods.get(self._name)
        leader_ip = ips[0] or pod and pod.status.pod_ip
        # don't touch subsets if our (leader) ip is unknown or subsets is valid
        if leader_ip and self.subsets_changed(latest_subsets, leader_ip, self.__ports):
            kwargs = {'hostname': pod.spec.hostname, 'node_name': pod.spec.node_name,
                      'target_ref': self.__target_ref(leader_ip, latest_subsets, pod)} if pod else {}
            address = k8s_client.V1EndpointAddress(ip=leader_ip, **kwargs)
            endpoints['subsets'] = [k8s_client.V1EndpointSubset(addresses=[address], ports=self.__ports)]

    def _patch_or_create(self, name: str, annotations: Dict[str, Any],
                         resource_version: Optional[str] = None, patch: bool = False,
                         retry: Optional[Callable[..., Any]] = None, ips: Optional[List[str]] = None) -> K8sObject:
        """Patch or create K8s object, Endpoint or ConfigMap.

        :param name: the name of the object.
        :param annotations: mapping of annotations that we want to create/update.
        :param resource_version: object should be updated only if the ``resource_version`` matches provided value.
        :param patch: ``True`` if we know in advance that the object already exists and we should patch it.
        :param retry: a callable that will take care of retries
        :param ips: IP address that we want to put to the subsets of the endpoint. Could have following values:

                    * ``None`` - when we don't need to touch subset;
                    * ``[]`` - to set subsets to the empty list, when :meth:`delete_leader` method is called;

                    * ``['ip.add.re.ss']`` - when we want to make sure that the subsets of the leader endpoint
                      contains the IP address of the leader, that we get from the ``kubernetes.pod_ip``;

                    * ``['']`` - when we want to make sure that the subsets of the leader endpoint contains the IP
                      address of the leader, but ``kubernetes.pod_ip`` configuration is missing. In this case we will
                      try to take the IP address of the Pod which name matches ``name`` from the config file.

        :returns: the new :class:`V1Endpoints` or :class:`V1ConfigMap` object, that was created or updated.
        """
        metadata = {'namespace': self._namespace, 'name': name, 'labels': self._labels, 'annotations': annotations}
        if patch or resource_version:
            if resource_version is not None:
                metadata['resource_version'] = resource_version
            func = functools.partial(self._api.patch_namespaced_kind, name)
            metadata['annotations'] = annotations
        else:
            func = functools.partial(self._api.create_namespaced_kind)
            # skip annotations with null values
            metadata['annotations'] = {k: v for k, v in annotations.items() if v is not None}

        metadata = k8s_client.V1ObjectMeta(**metadata)
        if self._api.use_endpoints:
            endpoints = {'metadata': metadata}
            if ips is not None:
                self._map_subsets(endpoints, ips)
            body = k8s_client.V1Endpoints(**endpoints)
        else:
            body = k8s_client.V1ConfigMap(metadata=metadata)
        ret = retry(func, self._namespace, body) if retry else func(self._namespace, body)
        if ret:
            self._kinds.set(name, ret)
        return ret

    @catch_kubernetes_errors
    def patch_or_create(self, name: str, annotations: Dict[str, Any], resource_version: Optional[str] = None,
                        patch: bool = False, retry: bool = True, ips: Optional[List[str]] = None) -> K8sObject:
        try:
            return self._patch_or_create(name, annotations, resource_version, patch, self.retry if retry else None, ips)
        except k8s_client.rest.ApiException as e:
            if e.status == 409 and resource_version:  # Conflict in resource_version
                # Terminate watchers, it could be a sign that K8s API is in a failed state
                self._kinds.kill_stream()
                self._pods.kill_stream()
            raise e

    def patch_or_create_config(self, annotations: Dict[str, Any],
                               resource_version: Optional[str] = None, patch: bool = False, retry: bool = True) -> bool:
        # SCOPE-config endpoint requires corresponding service otherwise it might be "cleaned" by k8s master
        if self._api.use_endpoints and not patch and not resource_version:
            self._should_create_config_service = True
            self._create_config_service()
        return bool(self.patch_or_create(self.config_path, annotations, resource_version, patch, retry))

    def _create_config_service(self) -> None:
        metadata = k8s_client.V1ObjectMeta(namespace=self._namespace, name=self.config_path, labels=self._labels)
        body = k8s_client.V1Service(metadata=metadata, spec=k8s_client.V1ServiceSpec(cluster_ip='None'))
        try:
            if not self._api.create_namespaced_service(self._namespace, body):
                return
        except Exception as e:
            # 409 - service already exists, 403 - creation forbidden
            if not isinstance(e, k8s_client.rest.ApiException) or e.status not in (409, 403):
                return logger.exception('create_config_service failed')
        self._should_create_config_service = False

    def _write_leader_optime(self, last_lsn: str) -> bool:
        """Unused"""
        raise NotImplementedError  # pragma: no cover

    def _write_status(self, value: str) -> bool:
        """Unused"""
        raise NotImplementedError  # pragma: no cover

    def _write_failsafe(self, value: str) -> bool:
        """Unused"""
        raise NotImplementedError  # pragma: no cover

    def _update_leader(self, leader: Leader) -> bool:
        """Unused"""
        raise NotImplementedError  # pragma: no cover

    def write_leader_optime(self, last_lsn: int) -> None:
        """Write value for WAL LSN to ``optime`` annotation of the leader object.

        :param last_lsn: absolute WAL LSN in bytes.
        """
        self.patch_or_create(self.leader_path, {self._OPTIME: str(last_lsn)}, patch=True, retry=False)

    def _update_leader_with_retry(self, annotations: Dict[str, Any],
                                  resource_version: Optional[str], ips: List[str]) -> bool:
        retry = self._retry.copy()

        def _retry(*args: Any, **kwargs: Any) -> Any:
            kwargs['_retry'] = retry
            return retry(*args, **kwargs)

        try:
            return bool(self._patch_or_create(self.leader_path, annotations, resource_version, ips=ips, retry=_retry))
        except k8s_client.rest.ApiException as e:
            if e.status == 409:
                logger.warning('Concurrent update of %s', self.leader_path)
            else:
                logger.exception('Permission denied' if e.status == 403 else 'Unexpected error from Kubernetes API')
                return False
        except (RetryFailedError, K8sException) as e:
            raise KubernetesError(e)

        # if we are here, that means update failed with 409
        if not retry.ensure_deadline(1):
            return False  # No time for retry. Tell ha.py that we have to demote due to failed update.

        # Try to get the latest version directly from K8s API instead of relying on async cache
        try:
            kind = _retry(self._api.read_namespaced_kind, self.leader_path, self._namespace)
        except (RetryFailedError, K8sException) as e:
            raise KubernetesError(e)
        except Exception as e:
            logger.error('Failed to get the leader object "%s": %r', self.leader_path, e)
            return False

        self._kinds.set(self.leader_path, kind)

        if not retry.ensure_deadline(0.5):
            return False

        kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT
        kind_resource_version = kind and kind.metadata.resource_version

        # There is different leader or resource_version in cache didn't change
        if kind and (kind_annotations.get(self._LEADER) != self._name or kind_resource_version == resource_version):
            return False

        return bool(_run_and_handle_exceptions(self._patch_or_create, self.leader_path, annotations,
                                               kind_resource_version, ips=ips, retry=_retry))

    def update_leader(self, leader: Leader, last_lsn: Optional[int],
                      slots: Optional[Dict[str, int]] = None, failsafe: Optional[Dict[str, str]] = None) -> bool:
        kind = self._kinds.get(self.leader_path)
        kind_annotations = kind and kind.metadata.annotations or EMPTY_DICT

        if kind and kind_annotations.get(self._LEADER) != self._name:
            return False

        now = datetime.datetime.now(tzutc).isoformat()
        leader_observed_record = kind_annotations or self._leader_observed_record
        annotations = {self._LEADER: self._name, 'ttl': str(self._ttl), 'renewTime': now,
                       'acquireTime': leader_observed_record.get('acquireTime') or now,
                       'transitions': leader_observed_record.get('transitions') or '0'}
        if last_lsn:
            annotations[self._OPTIME] = str(last_lsn)
            annotations['slots'] = json.dumps(slots, separators=(',', ':')) if slots else None

        if failsafe is not None:
            annotations[self._FAILSAFE] = json.dumps(failsafe, separators=(',', ':')) if failsafe else None

        resource_version = kind and kind.metadata.resource_version
        return self._update_leader_with_retry(annotations, resource_version, self.__ips)

    def attempt_to_acquire_leader(self) -> bool:
        now = datetime.datetime.now(tzutc).isoformat()
        annotations = {self._LEADER: self._name, 'ttl': str(self._ttl),
                       'renewTime': now, 'acquireTime': now, 'transitions': '0'}
        if self._leader_observed_record:
            try:
                transitions = int(self._leader_observed_record.get('transitions', ''))
            except (TypeError, ValueError):
                transitions = 0

            if self._leader_observed_record.get(self._LEADER) != self._name:
                transitions += 1
            else:
                annotations['acquireTime'] = self._leader_observed_record.get('acquireTime') or now
            annotations['transitions'] = str(transitions)

        try:
            ret = bool(self._patch_or_create(self.leader_path, annotations,
                                             self._leader_resource_version, retry=self.retry, ips=self.__ips))
        except k8s_client.rest.ApiException as e:
            if e.status == 409 and self._leader_resource_version:  # Conflict in resource_version
                # Terminate watchers, it could be a sign that K8s API is in a failed state
                self._kinds.kill_stream()
                self._pods.kill_stream()
            ret = False
        except (RetryFailedError, K8sException) as e:
            raise KubernetesError(e)

        if not ret:
            logger.info('Could not take out TTL lock')
        return ret

    def take_leader(self) -> bool:
        return self.attempt_to_acquire_leader()

    def set_failover_value(self, value: str, version: Optional[str] = None) -> bool:
        """Unused"""
        raise NotImplementedError  # pragma: no cover

    def manual_failover(self, leader: Optional[str], candidate: Optional[str],
                        scheduled_at: Optional[datetime.datetime] = None, version: Optional[str] = None) -> bool:
        annotations = {'leader': leader or None, 'member': candidate or None,
                       'scheduled_at': scheduled_at and scheduled_at.isoformat()}
        patch = bool(self.cluster and isinstance(self.cluster.failover, Failover) and self.cluster.failover.version)
        return bool(self.patch_or_create(self.failover_path, annotations, version, bool(version or patch), False))

    @property
    def _config_resource_version(self) -> Optional[str]:
        config = self._kinds.get(self.config_path)
        return config and config.metadata.resource_version

    def set_config_value(self, value: str, version: Optional[str] = None) -> bool:
        return self.patch_or_create_config({self._CONFIG: value}, version, bool(self._config_resource_version), False)

    @catch_kubernetes_errors
    def touch_member(self, data: Dict[str, Any]) -> bool:
        cluster = self.cluster
        if cluster and cluster.leader and cluster.leader.name == self._name:
            role = self._standby_leader_label_value if data['role'] == 'standby_leader' else self._leader_label_value
            tmp_role = 'master'
        elif data['state'] == 'running' and data['role'] not in ('master', 'primary'):
            role = {'replica': self._follower_label_value}.get(data['role'], data['role'])
            tmp_role = data['role']
        else:
            role = None
            tmp_role = None

        role_labels = {self._role_label: role}
        if self._tmp_role_label:
            role_labels[self._tmp_role_label] = tmp_role

        member = cluster and cluster.get_member(self._name, fallback_to_leader=False)
        pod_labels = member and member.data.pop('pod_labels', None)
        ret = member and pod_labels is not None\
            and all(pod_labels.get(k) == v for k, v in role_labels.items())\
            and deep_compare(data, member.data)

        if not ret:
            metadata = {'namespace': self._namespace, 'name': self._name, 'labels': role_labels,
                        'annotations': {'status': json.dumps(data, separators=(',', ':'))}}
            body = k8s_client.V1Pod(metadata=k8s_client.V1ObjectMeta(**metadata))
            ret = self._api.patch_namespaced_pod(self._name, self._namespace, body)
            if ret:
                self._pods.set(self._name, ret)
        if self._should_create_config_service:
            self._create_config_service()
        return bool(ret)

    def initialize(self, create_new: bool = True, sysid: str = "") -> bool:
        cluster = self.cluster
        resource_version = str(cluster.config.version)\
            if cluster and cluster.config and cluster.config.version else None
        return self.patch_or_create_config({self._INITIALIZE: sysid}, resource_version)

    def _delete_leader(self, leader: Leader) -> bool:
        """Unused"""
        raise NotImplementedError  # pragma: no cover

    def delete_leader(self, leader: Optional[Leader], last_lsn: Optional[int] = None) -> bool:
        ret = False
        kind = self._kinds.get(self.leader_path)
        if kind and (kind.metadata.annotations or EMPTY_DICT).get(self._LEADER) == self._name:
            annotations: Dict[str, Optional[str]] = {self._LEADER: None}
            if last_lsn:
                annotations[self._OPTIME] = str(last_lsn)
            ret = self.patch_or_create(self.leader_path, annotations, kind.metadata.resource_version, True, False, [])
            self.reset_cluster()
        return ret

    def cancel_initialization(self) -> bool:
        return self.patch_or_create_config({self._INITIALIZE: None}, None, True)

    @catch_kubernetes_errors
    def delete_cluster(self) -> bool:
        return bool(self.retry(self._api.delete_collection_namespaced_kind,
                               self._namespace, label_selector=self._label_selector))

    def set_history_value(self, value: str) -> bool:
        return self.patch_or_create_config({self._HISTORY: value}, None, bool(self._config_resource_version), False)

    def set_sync_state_value(self, value: str, version: Optional[str] = None) -> bool:
        """Unused"""
        raise NotImplementedError  # pragma: no cover

    def write_sync_state(self, leader: Optional[str], sync_standby: Optional[Collection[str]],
                         version: Optional[str] = None) -> Optional[SyncState]:
        """Prepare and write annotations to $SCOPE-sync Endpoint or ConfigMap.

        :param leader: name of the leader node that manages /sync key
        :param sync_standby: collection of currently known synchronous standby node names
        :param version: last known `resource_version` for conditional update of the object
        :returns: the new :class:`SyncState` object or None
        """
        sync_state = self.sync_state(leader, sync_standby)
        ret = self.patch_or_create(self.sync_path, sync_state, version, False)
        if not isinstance(ret, bool):
            return SyncState.from_node(ret.metadata.resource_version, sync_state)

    def delete_sync_state(self, version: Optional[str] = None) -> bool:
        """Patch annotations of $SCOPE-sync Endpoint or ConfigMap with empty values.

        Effectively it removes "leader" and "sync_standby" annotations from the object.
        :param version: last known `resource_version` for conditional update of the object
        :returns: `True` if "delete" was successful
        """
        return self.write_sync_state(None, None, version=version) is not None

    def watch(self, leader_version: Optional[str], timeout: float) -> bool:
        if self.__do_not_watch:
            self.__do_not_watch = False
            return True

        # We want to give a bit more time to non-leader nodes to synchronize HA loops
        if leader_version:
            timeout += 0.5

        try:
            return super(Kubernetes, self).watch(None, timeout)
        finally:
            self.event.clear()
